package com.flow.framework.common.util.io;

import com.flow.framework.common.constant.FrameworkCommonConstant;
import com.flow.framework.common.error.SystemErrorCode;
import com.flow.framework.common.exception.CheckedException;
import com.flow.framework.common.stream.handler.BatchProcessHandler;
import com.flow.framework.common.stream.handler.BatchReturnProcessHandler;
import com.flow.framework.common.stream.handler.BatchVoidProcessHandler;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;

/**
 * IO工具类
 *
 * @author luoguopiao
 * @version 0.0.1
 * @date 2022/1/3
 */
@Slf4j
public final class IoUtil {

    private IoUtil() {
    }

    /**
     * 关闭可关闭对象
     *
     * @param closeables closeables
     */
    public static void close(AutoCloseable... closeables) {
        if (null == closeables || closeables.length == 0) {
            return;
        }
        for (AutoCloseable closeable : closeables) {
            if (null == closeable) {
                continue;
            }
            try {
                closeable.close();
            } catch (Exception e) {
                log.warn("close closeable error.", e);
            }
        }
    }

    /**
     * 将input stream转换成不超过给定限制大小的字节数组，否则抛出异常
     *
     * @param inputStream inputStream
     * @param limit       limit
     * @return
     */
    public static byte[] toLimitByteArray(InputStream inputStream, int limit) {
        List<Cache> caches = new LinkedList<>();
        byte[] bytes = new byte[FrameworkCommonConstant.DEFAULT_IO_BUFFER_SIZE];
        int len;
        int totalLength = 0;
        try {
            while (FrameworkCommonConstant.EOF != (len = inputStream.read(bytes))) {
                totalLength += len;
                if (totalLength > limit) {
                    caches.add(new Cache(Arrays.copyOf(bytes, len)));
                    break;
                }
                caches.add(new Cache(Arrays.copyOf(bytes, len)));
            }
        } catch (Exception e) {
            log.error("read input stream error.", e);
            throw new CheckedException(SystemErrorCode.UNEXPECTED_ERROR, "read input stream error.", e);
        }
        return getLimitResultByteArray(limit, caches);
    }

    /**
     * 检查输入流长度是否超过限制并将input stream转换成不超过给定限制大小的字节数组，如果超过限制则抛出异常
     *
     * @param inputStream       inputStream
     * @param limit             limit
     * @param otherwiseCallback otherwiseCallback
     * @return byte数组
     */
    public static <E extends CheckedException> byte[] checkAndToLimitByteArray(InputStream inputStream, int limit,
                                                                               Supplier<E> otherwiseCallback) {
        List<Cache> caches = new LinkedList<>();
        byte[] bytes = new byte[FrameworkCommonConstant.DEFAULT_IO_BUFFER_SIZE];
        int len;
        int totalLength = 0;
        try {
            while (FrameworkCommonConstant.EOF != (len = inputStream.read(bytes))) {
                totalLength += len;
                if (totalLength > limit) {
                    log.error("caches length over limit. limit : {}", limit);
                    throw otherwiseCallback.get();
                }
                caches.add(new Cache(Arrays.copyOf(bytes, len)));
            }
        } catch (Exception e) {
            log.error("read input stream error.", e);
            throw new CheckedException(SystemErrorCode.UNEXPECTED_ERROR, "read input stream error.", e);
        }
        return getLimitResultByteArray(limit, caches);
    }

    private static byte[] getLimitResultByteArray(int limitSize, List<Cache> caches) {
        byte[] tempResult = new byte[limitSize];
        int nextDestPos = 0;
        for (Cache cache : caches) {
            byte[] data = cache.getBytes();
            int dataLen = data.length;
            int freeLen = limitSize - nextDestPos;
            int addLen = Math.min(freeLen, dataLen);
            System.arraycopy(data, 0, tempResult, nextDestPos, addLen);
            nextDestPos += addLen;
        }
        if (0 == nextDestPos) {
            return new byte[0];
        }

        // 如果实际长度等于限制长度，则直接返回
        int realLen = nextDestPos;
        if (realLen == limitSize) {
            return tempResult;
        }

        // 如果实际长度小于限制长度，则需要重新复制到新数组并返回
        byte[] result = new byte[realLen];
        System.arraycopy(tempResult, 0, result, 0, realLen);
        return result;
    }

    /**
     * 输入流转输出流
     *
     * @param input         input
     * @param output        output
     * @param errorCallback 出现异常后需要抛出的异常
     * @param <E>           E
     */
    public static <E extends CheckedException> void copy(InputStream input, OutputStream output,
                                                         Function<Exception, E> errorCallback) {
        byte[] buffer = new byte[FrameworkCommonConstant.DEFAULT_IO_BUFFER_SIZE];
        int len;
        try {
            while (FrameworkCommonConstant.EOF != (len = input.read(buffer))) {
                output.write(buffer, 0, len);
            }
        } catch (Exception e) {
            log.error("io copy error.", e);
            throw errorCallback.apply(e);
        }
    }

    public static <E extends CheckedException> long copyAndGetSize(InputStream input, OutputStream output,
                                                                   Function<Exception, E> errorCallback) {
        byte[] buffer = new byte[FrameworkCommonConstant.DEFAULT_IO_BUFFER_SIZE];
        int len;
        long count = 0;
        try {
            while (FrameworkCommonConstant.EOF != (len = input.read(buffer))) {
                output.write(buffer, 0, len);
                count += len;
            }
        } catch (Exception e) {
            log.error("io copy and get size error.", e);
            throw errorCallback.apply(e);
        }
        return count;
    }

    /**
     * 输入流转输出流并实时批量处理流数据
     *
     * @param inputStream               输入流
     * @param outputStream              输出流
     * @param batchReturnProcessHandler 批量处理器
     * @param errorCallback             出错回调
     * @param <E>                       检查异常
     */
    public static <E extends CheckedException> void copyAndProcess(InputStream inputStream,
                                                                   OutputStream outputStream,
                                                                   BatchReturnProcessHandler batchReturnProcessHandler,
                                                                   Function<Exception, E> errorCallback) {
        checkInputStreamAndHandler(inputStream, batchReturnProcessHandler);
        int batchSize = batchReturnProcessHandler.getBatchSize();
        byte[] buffer = new byte[batchSize];
        int len;
        try {
            while (FrameworkCommonConstant.EOF != (len = inputStream.read(buffer))) {
                byte[] resultBuffer = wrapByteArray(len, batchSize, batchReturnProcessHandler, buffer);
                outputStream.write(resultBuffer);
            }
        } catch (Exception e) {
            log.error("io copy and process error.", e);
            throw errorCallback.apply(e);
        }
    }

    /**
     * 输入流转输出流并实时批量处理流数据
     *
     * @param inputStream               输入流
     * @param outputStream              输出流
     * @param batchReturnProcessHandler 批量处理器
     * @param errorCallback             出错回调
     * @param <E>                       检查异常
     * @return 数据长度
     */
    public static <E extends CheckedException> long copyAndProcessAndGetSize(InputStream inputStream,
                                                                             OutputStream outputStream,
                                                                             BatchReturnProcessHandler batchReturnProcessHandler,
                                                                             Function<Exception, E> errorCallback) {
        checkInputStreamAndHandler(inputStream, batchReturnProcessHandler);
        int batchSize = batchReturnProcessHandler.getBatchSize();
        byte[] buffer = new byte[batchSize];
        int len;
        long count = 0;
        try {
            while (FrameworkCommonConstant.EOF != (len = inputStream.read(buffer))) {
                byte[] resultBuffer = wrapByteArray(len, batchSize, batchReturnProcessHandler, buffer);
                int length = resultBuffer.length;
                outputStream.write(resultBuffer, 0, length);
                count += length;
            }
            return count;
        } catch (Exception e) {
            log.error("io copy and process and get size error.", e);
            throw errorCallback.apply(e);
        }
    }

    /**
     * 批量处理输入流
     *
     * @param inputStream             输入流
     * @param batchVoidProcessHandler 实际处理的handler
     * @param errorCallback           出错回调
     * @param <E>                     检查异常
     */
    public static <E extends CheckedException> void handleInputStream(InputStream inputStream,
                                                                      BatchVoidProcessHandler batchVoidProcessHandler,
                                                                      Function<Exception, E> errorCallback) {
        checkInputStreamAndHandler(inputStream, batchVoidProcessHandler);
        int batchSize = batchVoidProcessHandler.getBatchSize();
        int len;
        byte[] bytes = new byte[batchSize];
        try {
            while (FrameworkCommonConstant.EOF != (len = inputStream.read(bytes))) {
                if (len == batchSize) {
                    batchVoidProcessHandler.process(bytes);
                } else {
                    byte[] temp = new byte[len];
                    System.arraycopy(bytes, 0, temp, 0, len);
                    batchVoidProcessHandler.process(temp);
                }
            }
        } catch (Exception e) {
            log.error("batch process input stream error.", e);
            throw errorCallback.apply(e);
        }
    }

    /**
     * 将输入流写入文件
     *
     * @param inputStream   输入流
     * @param destFile      需要写入的目标文件
     * @param errorCallback 出错回调
     * @param <E>           检查异常
     */
    public static <E extends CheckedException> void handleInputStreamToFile(InputStream inputStream,
                                                                            File destFile,
                                                                            Function<Exception, E> errorCallback) {
        if (null == inputStream) {
            log.error("input stream is null.");
            throw new CheckedException(SystemErrorCode.PARAMS_ERROR);
        }
        int len;
        byte[] bytes = new byte[FrameworkCommonConstant.DEFAULT_IO_BUFFER_SIZE];
        OutputStream outputStream = null;
        try {
            outputStream = new BufferedOutputStream(new FileOutputStream(destFile));
            while (FrameworkCommonConstant.EOF != (len = inputStream.read(bytes))) {
                outputStream.write(bytes, 0, len);
            }
        } catch (Exception e) {
            log.error("handle input stream to file error.", e);
            throw errorCallback.apply(e);
        } finally {
            close(outputStream);
        }
    }

    /**
     * 批量处理输入流并写入文件
     *
     * @param inputStream               输入流
     * @param batchReturnProcessHandler 实际处理的handler
     * @param destFile                  处理完后需要写入的目标文件
     * @param errorCallback             出错回调
     * @param <E>                       检查异常
     */
    public static <E extends CheckedException> void handleInputStreamToFile(InputStream inputStream,
                                                                            BatchReturnProcessHandler batchReturnProcessHandler,
                                                                            File destFile,
                                                                            Function<Exception, E> errorCallback) {
        checkInputStreamAndHandler(inputStream, batchReturnProcessHandler);
        int batchSize = batchReturnProcessHandler.getBatchSize();
        int len;
        byte[] bytes = new byte[batchSize];
        OutputStream outputStream = null;
        try {
            outputStream = new BufferedOutputStream(new FileOutputStream(destFile));
            while (FrameworkCommonConstant.EOF != (len = inputStream.read(bytes))) {
                byte[] resultBytes;
                if (len == batchSize) {
                    resultBytes = batchReturnProcessHandler.process(bytes);
                } else {
                    byte[] temp = new byte[len];
                    System.arraycopy(bytes, 0, temp, 0, len);
                    resultBytes = batchReturnProcessHandler.process(temp);
                }
                outputStream.write(resultBytes);
            }
        } catch (Exception e) {
            log.error("batch process input stream to file error.", e);
            throw errorCallback.apply(e);
        } finally {
            close(outputStream);
        }
    }


    private static byte[] wrapByteArray(int len, int batchSize, BatchReturnProcessHandler batchReturnProcessHandler, byte[] buffer) {
        if (len == batchSize) {
            return batchReturnProcessHandler.process(buffer);
        }
        byte[] temp = new byte[len];
        System.arraycopy(buffer, 0, temp, 0, len);
        return batchReturnProcessHandler.process(temp);
    }

    public static void checkInputStreamAndHandler(InputStream inputStream, BatchProcessHandler batchProcessHandler) {
        if (null == inputStream) {
            log.error("input stream is null.");
            throw new CheckedException(SystemErrorCode.PARAMS_ERROR);
        }
        checkBatchProcessHandler(batchProcessHandler);
    }

    public static void checkOutputStreamAndHandler(OutputStream outputStream, BatchProcessHandler batchProcessHandler) {
        if (null == outputStream) {
            log.error("output stream is null.");
            throw new CheckedException(SystemErrorCode.PARAMS_ERROR);
        }
        checkBatchProcessHandler(batchProcessHandler);
    }

    private static void checkBatchProcessHandler(BatchProcessHandler batchProcessHandler) {
        if (null == batchProcessHandler) {
            log.error("batch process handler is null.");
            throw new CheckedException(SystemErrorCode.PARAMS_ERROR);
        }

        int batchSize = batchProcessHandler.getBatchSize();
        if (batchSize <= 0 || batchSize > BatchProcessHandler.MAX_BUFFER_SIZE) {
            log.error("batch buffer size error. batch size : {}", batchSize);
            throw new CheckedException(SystemErrorCode.PARAMS_ERROR);
        }
    }


    @Data
    @AllArgsConstructor
    private static class Cache {

        private byte[] bytes;
    }
}
